home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / python-support / python-rdflib / rdflib / store / MySQL.py < prev    next >
Encoding:
Python Source  |  2007-04-04  |  20.2 KB  |  514 lines

  1. from __future__ import generators
  2. from rdflib import BNode
  3. from rdflib.store import Store,VALID_STORE, CORRUPTED_STORE, NO_STORE, UNKNOWN
  4. from rdflib.Literal import Literal
  5. from pprint import pprint
  6. import MySQLdb,sys
  7. from rdflib.term_utils import *
  8. from rdflib.Graph import QuotedGraph
  9. from rdflib.store.REGEXMatching import REGEXTerm, NATIVE_REGEX, PYTHON_REGEX
  10. from rdflib.store.AbstractSQLStore import *
  11. from FOPLRelationalModel.RelationalHash import IdentifierHash, LiteralHash, RelationalHash, GarbageCollectionQUERY
  12. from FOPLRelationalModel.BinaryRelationPartition import *
  13. from FOPLRelationalModel.QuadSlot import *
  14.  
  15. Any = None
  16.  
  17. def ParseConfigurationString(config_string):
  18.     """
  19.     Parses a configuration string in the form:
  20.     key1=val1,key2=val2,key3=val3,...
  21.     The following configuration keys are expected (not all are required):
  22.     user
  23.     password
  24.     db
  25.     host
  26.     port (optional - defaults to 3306)
  27.     """
  28.     kvDict = dict([(part.split('=')[0],part.split('=')[-1]) for part in config_string.split(',')])
  29.     for requiredKey in ['user','db','host']:
  30.         assert requiredKey in kvDict
  31.     if 'port' not in kvDict:
  32.         kvDict['port']=3306
  33.     if 'password' not in kvDict:
  34.         kvDict['password']=''
  35.     return kvDict
  36.  
  37. def createTerm(termString,termType,store,objLanguage=None,objDatatype=None):
  38.     if termType == 'L':
  39.         cache = store.literalCache.get((termString,objLanguage,objDatatype))
  40.         if cache is not None:
  41.             #store.cacheHits += 1
  42.             return cache
  43.         else:
  44.             #store.cacheMisses += 1
  45.             rt = Literal(termString,objLanguage,objDatatype)
  46.             store.literalCache[((termString,objLanguage,objDatatype))] = rt
  47.             return rt
  48.     elif termType=='F':
  49.         cache = store.otherCache.get((termType,termString))
  50.         if cache is not None:
  51.             #store.cacheHits += 1
  52.             return cache
  53.         else:
  54.             #store.cacheMisses += 1
  55.             rt = QuotedGraph(store,URIRef(termString))
  56.             store.otherCache[(termType,termString)] = rt
  57.             return rt
  58.     elif termType == 'B':
  59.         cache = store.bnodeCache.get((termString))
  60.         if cache is not None:
  61.             #store.cacheHits += 1
  62.             return cache
  63.         else:
  64.             #store.cacheMisses += 1
  65.             rt = TERM_INSTANCIATION_DICT[termType](termString)
  66.             store.bnodeCache[(termString)] = rt
  67.             return rt
  68.     elif termType =='U':
  69.         cache = store.uriCache.get((termString))
  70.         if cache is not None:
  71.             #store.cacheHits += 1
  72.             return cache
  73.         else:
  74.             #store.cacheMisses += 1
  75.             rt = URIRef(termString)
  76.             store.uriCache[(termString)] = rt
  77.             return rt
  78.     else:
  79.         cache = store.otherCache.get((termType,termString))
  80.         if cache is not None:
  81.             #store.cacheHits += 1
  82.             return cache
  83.         else:
  84.             #store.cacheMisses += 1
  85.             rt = TERM_INSTANCIATION_DICT[termType](termString)
  86.             store.otherCache[(termType,termString)] = rt
  87.             return rt
  88.  
  89. def extractTriple(tupleRt,store,hardCodedContext=None):
  90.     subject,sTerm,predicate,pTerm,obj,oTerm,rtContext,cTerm,objDatatype,objLanguage = tupleRt
  91.     context = rtContext is not None and rtContext or hardCodedContext.identifier
  92.  
  93.     s=createTerm(subject,sTerm,store)
  94.     p=createTerm(predicate,pTerm,store)
  95.     o=createTerm(obj,oTerm,store,objLanguage,objDatatype)
  96.  
  97.     graphKlass, idKlass = constructGraph(cTerm)
  98.     return s,p,o,(graphKlass,idKlass,context)
  99.  
  100.  
  101. class MySQL(Store):
  102.     """
  103.     MySQL implementation of FOPL Relational Model as an rdflib Store
  104.     """
  105.     context_aware = True
  106.     formula_aware = True
  107.     transaction_aware = True
  108.     regex_matching = NATIVE_REGEX
  109.  
  110.     def __init__(self, identifier=None, configuration=None):
  111.         self.identifier = identifier and identifier or 'hardcoded'
  112.         #Use only the first 10 bytes of the digest
  113.         self._internedId = INTERNED_PREFIX + sha.new(self.identifier).hexdigest()[:10]
  114.  
  115.         #Setup FOPL RelationalModel objects
  116.         self.idHash = IdentifierHash(self._internedId)
  117.         self.valueHash = LiteralHash(self._internedId)
  118.         self.binaryRelations = NamedBinaryRelations(self._internedId,self.idHash,self.valueHash)
  119.         self.literalProperties = NamedLiteralProperties(self._internedId,self.idHash,self.valueHash)
  120.         self.aboxAssertions = AssociativeBox(self._internedId,self.idHash,self.valueHash)
  121.         self.tables = [
  122.                        self.binaryRelations,
  123.                        self.literalProperties,
  124.                        self.aboxAssertions,
  125.                        self.idHash,
  126.                        self.valueHash
  127.                        ]
  128.         self.createTables = [
  129.                        self.idHash,
  130.                        self.valueHash,
  131.                        self.binaryRelations,
  132.                        self.literalProperties,
  133.                        self.aboxAssertions
  134.                        ]
  135.         self.hashes = [self.idHash,self.valueHash]
  136.         self.partitions = [self.literalProperties,self.binaryRelations,self.aboxAssertions,]
  137.  
  138.         #This parameter controls how exlusively the literal table is searched
  139.         #If true, the Literal partition is searched *exclusively* if the object term
  140.         #in a triple pattern is a Literal or a REGEXTerm.  Note, the latter case
  141.         #prevents the matching of URIRef nodes as the objects of a triple in the store.
  142.         #If the object term is a wildcard (None)
  143.         #Then the Literal paritition is searched in addition to the others
  144.         #If this parameter is false, the literal partition is searched regardless of what the object
  145.         #of the triple pattern is
  146.         self.STRONGLY_TYPED_TERMS = False
  147.         self._db = None
  148.         if configuration is not None:
  149.             self.open(configuration)
  150.  
  151.         self.cacheHits = 0
  152.         self.cacheMisses = 0
  153.  
  154.         self.literalCache = {}
  155.         self.uriCache = {}
  156.         self.bnodeCache = {}
  157.         self.otherCache = {}
  158.  
  159.     def executeSQL(self,cursor,qStr,params=None,paramList=False):
  160.         """
  161.         Overridded in order to pass params seperate from query for MySQLdb
  162.         to optimize
  163.         """
  164.         #self._db.autocommit(False)
  165.         if params is None:
  166.             cursor.execute(qStr)
  167.         elif paramList:
  168.             cursor.executemany(qStr,[tuple(item) for item in params])
  169.         else:
  170.             cursor.execute(qStr,tuple(params))
  171.  
  172.     #Database Management Methods
  173.     def open(self, configuration, create=False):
  174.         """
  175.         Opens the store specified by the configuration string. If
  176.         create is True a store will be created if it does not already
  177.         exist. If create is False and a store does not already exist
  178.         an exception is raised. An exception is also raised if a store
  179.         exists, but there is insufficient permissions to open the
  180.         store.
  181.         """
  182.         configDict = ParseConfigurationString(configuration)
  183.         if create:
  184.             test_db = MySQLdb.connect(user=configDict['user'],
  185.                                       passwd=configDict['password'],
  186.                                       db='test',
  187.                                       port=configDict['port'],
  188.                                       host=configDict['host'],
  189.                                       #use_unicode=True,
  190.                                       #read_default_file='/etc/my-client.cnf'
  191.                                       )
  192.             c=test_db.cursor()
  193.             c.execute("""SET AUTOCOMMIT=0""")
  194.             c.execute("""SHOW DATABASES""")
  195.             if not (configDict['db'].encode('utf-8'),) in c.fetchall():
  196.                 print "creating %s (doesn't exist)"%(configDict['db'])
  197.                 c.execute("""CREATE DATABASE %s"""%(configDict['db'],))
  198.                 test_db.commit()
  199.                 c.close()
  200.                 test_db.close()
  201.  
  202.             db = MySQLdb.connect(user = configDict['user'],
  203.                                  passwd = configDict['password'],
  204.                                  db=configDict['db'],
  205.                                  port=configDict['port'],
  206.                                  host=configDict['host'],
  207.                                  #use_unicode=True,
  208.                                  #read_default_file='/etc/my-client.cnf'
  209.                                  )
  210.             c=db.cursor()
  211.             c.execute("""SET AUTOCOMMIT=0""")
  212.             c.execute(CREATE_NS_BINDS_TABLE%(self._internedId))
  213.             for kb in self.createTables:
  214.                 c.execute(kb.createSQL())
  215.                 if isinstance(kb,RelationalHash) and kb.defaultSQL():
  216.                     c.execute(kb.defaultSQL())
  217.  
  218.             db.commit()
  219.             c.close()
  220.             db.close()
  221.         try:
  222.             port = int(configDict['port'])
  223.         except:
  224.             raise ArithmeticError('MySQL port must be a valid integer')
  225.         self._db = MySQLdb.connect(user = configDict['user'],
  226.                                    passwd = configDict['password'],
  227.                                    db=configDict['db'],
  228.                                    port=port,
  229.                                    host=configDict['host'],
  230.                                    #use_unicode=True,
  231.                                    #read_default_file='/etc/my.cnf'
  232.                                   )
  233.         self._db.autocommit(False)
  234.         c=self._db.cursor()
  235.         c.execute("""SHOW DATABASES""")
  236.         #FIXME This is a character set hack.  See: http://sourceforge.net/forum/forum.php?thread_id=1448424&forum_id=70461
  237.         #self._db.charset = 'utf8'
  238.         rt = c.fetchall()
  239.         if (configDict['db'].encode('utf-8'),) in rt:
  240.             for tn in self.tables:
  241.                 c.execute("""show tables like '%s'"""%(tn,))
  242.                 rt=c.fetchall()
  243.                 if not rt:
  244.                     sys.stderr.write("table %s Doesn't exist\n" % (tn));
  245.                     #The database exists, but one of the partitions doesn't exist
  246.                     return CORRUPTED_STORE
  247.             #Everything is there (the database and the partitions)
  248.             return VALID_STORE
  249.         #The database doesn't exist - nothing is there
  250.         return NO_STORE
  251.  
  252.     def destroy(self, configuration):
  253.         """
  254.         FIXME: Add documentation
  255.         """
  256.         configDict = ParseConfigurationString(configuration)
  257.         msql_db = MySQLdb.connect(user=configDict['user'],
  258.                                 passwd=configDict['password'],
  259.                                 db=configDict['db'],
  260.                                 port=configDict['port'],
  261.                                 host=configDict['host']
  262.                                 )
  263.         msql_db.autocommit(False)
  264.         c=msql_db.cursor()
  265.         for tbl in self.tables + ["%s_namespace_binds"%self._internedId]:
  266.             try:
  267.                 c.execute('DROP table %s'%tbl)
  268.                 #print "dropped table: %s"%(tblsuffix%(self._internedId))
  269.             except Exception, e:
  270.                 print "unable to drop table: %s"%(tbl)
  271.                 print e
  272.  
  273.         #Note, this only removes the associated tables for the closed world universe given by the identifier
  274.         print "Destroyed Close World Universe %s ( in MySQL database %s)"%(self.identifier,configDict['db'])
  275.         msql_db.commit()
  276.         msql_db.close()
  277.  
  278.     #Transactional interfaces
  279.     def commit(self):
  280.         """ """
  281.         self._db.commit()
  282.  
  283.     def rollback(self):
  284.         """ """
  285.         self._db.rollback()
  286.  
  287.     def gc(self):
  288.         """
  289.         Purges unreferenced identifiers / values - expensive
  290.         """
  291.         c=self._db.cursor()
  292.         purgeQueries = GarbageCollectionQUERY(
  293.                                                self.idHash,
  294.                                                self.valueHash,
  295.                                                self.binaryRelations,
  296.                                                self.aboxAssertions,
  297.                                                self.literalProperties)
  298.  
  299.         for q in purgeQueries:
  300.             self.executeSQL(c,q)
  301.  
  302.     def add(self, (subject, predicate, obj), context=None, quoted=False):
  303.         """ Add a triple to the store of triples. """
  304.         qSlots = genQuadSlots([subject,predicate,obj,context])
  305.         if predicate == RDF.type:
  306.             kb = self.aboxAssertions
  307.         elif isinstance(obj,Literal):
  308.             kb = self.literalProperties
  309.         else:
  310.             kb = self.binaryRelations
  311.         kb.insertRelations([qSlots])
  312.         kb.flushInsertions(self._db)
  313.  
  314.     def addN(self, quads):
  315.         """
  316.         Adds each item in the list of statements to a specific context. The quoted argument
  317.         is interpreted by formula-aware stores to indicate this statement is quoted/hypothetical.
  318.         Note that the default implementation is a redirect to add
  319.         """
  320.         for s,p,o,c in quads:
  321.             assert c is not None, "Context associated with %s %s %s is None!"%(s,p,o)
  322.             qSlots = genQuadSlots([s,p,o,c])
  323.             if p == RDF.type:
  324.                 kb = self.aboxAssertions
  325.             elif isinstance(o,Literal):
  326.                 kb = self.literalProperties
  327.             else:
  328.                 kb = self.binaryRelations
  329.  
  330.             kb.insertRelations([qSlots])
  331.  
  332.         for kb in self.partitions:
  333.             if kb.pendingInsertions:
  334.                 kb.flushInsertions(self._db)
  335.  
  336.     def remove(self, (subject, predicate, obj), context):
  337.         """ Remove a triple from the store """
  338.         targetBRPs = BinaryRelationPartitionCoverage((subject,predicate,obj,context),self.partitions)
  339.         c=self._db.cursor()
  340.         for brp in targetBRPs:
  341.             query = "DELETE %s from %s %s WHERE "%(
  342.                                           brp,
  343.                                           brp,
  344.                                           brp.generateHashIntersections()
  345.                                         )
  346.             whereClause,whereParameters = brp.generateWhereClause((subject,predicate,obj,context))
  347.             self.executeSQL(c,query+whereClause,params=whereParameters)
  348.  
  349.         c.close()
  350.  
  351.     def triples(self, (subject, predicate, obj), context=None):
  352.         c=self._db.cursor()
  353.         if context is None or isinstance(context.identifier,REGEXTerm):
  354.             rt=PatternResolution((subject,predicate,obj,context),c,self.partitions,fetchall=False)
  355.         else:
  356.             #No need to order by triple (expensive), all result sets will be in the same context
  357.             rt=PatternResolution((subject,predicate,obj,context),c,self.partitions,orderByTriple=False,fetchall=False)
  358.         while rt:
  359.             s,p,o,(graphKlass,idKlass,graphId) = extractTriple(rt,self,context)
  360.             currentContext=(context is None or isinstance(context.identifier,REGEXTerm)) and graphKlass(self,idKlass(graphId)) or context
  361.             contexts = [currentContext]
  362.             rt = next = c.fetchone()
  363.             if context is None or isinstance(context.identifier,REGEXTerm):
  364.                 sameTriple = next and extractTriple(next,self,context)[:3] == (s,p,o)
  365.                 while sameTriple:
  366.                     s2,p2,o2,(graphKlass,idKlass,graphId) = extractTriple(next,self,context)
  367.                     c2 = graphKlass(self,idKlass(graphId))
  368.                     contexts.append(c2)
  369.                     rt = next = c.fetchone()
  370.                     sameTriple = next and extractTriple(next,self,context)[:3] == (s,p,o)
  371.  
  372.             yield (s,p,o),(c for c in contexts)
  373.  
  374.     def triples_choices(self, (subject, predicate, object_),context=None):
  375.         """
  376.         A variant of triples that can take a list of terms instead of a single
  377.         term in any slot.  Stores can implement this to optimize the response time
  378.         from the import default 'fallback' implementation, which will iterate
  379.         over each term in the list and dispatch to tripless
  380.         """
  381.         if isinstance(object_,list):
  382.             assert not isinstance(subject,list), "object_ / subject are both lists"
  383.             assert not isinstance(predicate,list), "object_ / predicate are both lists"
  384.             if not object_:
  385.                 object_ = None
  386.             for (s1, p1, o1), cg in self.triples((subject,predicate,object_),context):
  387.                 yield (s1, p1, o1), cg
  388.  
  389.         elif isinstance(subject,list):
  390.             assert not isinstance(predicate,list), "subject / predicate are both lists"
  391.             if not subject:
  392.                 subject = None
  393.             for (s1, p1, o1), cg in self.triples((subject,predicate,object_),context):
  394.                 yield (s1, p1, o1), cg
  395.  
  396.         elif isinstance(predicate,list):
  397.             assert not isinstance(subject,list), "predicate / subject are both lists"
  398.             if not predicate:
  399.                 predicate = None
  400.             for (s1, p1, o1), cg in self.triples((subject,predicate,object_),context):
  401.                 yield (s1, p1, o1), cg
  402.  
  403.     def __repr__(self):
  404.         c=self._db.cursor()
  405.  
  406.         rtDict = {}
  407.         countRows = "select count(*) from %s"
  408.         countContexts = "select DISTINCT %s from %s"
  409.         unionSelect = ' union '.join([countContexts%(part.columnNames[CONTEXT],str(part)) for part in self.partitions])
  410.         self.executeSQL(c,unionSelect)
  411.         ctxCount = len(c.fetchall())
  412.         for part in self.partitions:
  413.             self.executeSQL(c,countRows%part)
  414.             rowCount = c.fetchone()[0]
  415.             rtDict[str(part)]=rowCount
  416.         return "<Parititioned MySQL N3 Store: %s context(s), %s classification(s), %s property/value assertion(s), and %s other relation(s)>"%(
  417.             ctxCount,
  418.             rtDict[str(self.aboxAssertions)],
  419.             rtDict[str(self.literalProperties)],
  420.             rtDict[str(self.binaryRelations)],
  421.         )
  422.  
  423.     def __len__(self, context=None):
  424.         rows = []
  425.         countRows = "select count(*) from %s"
  426.         c=self._db.cursor()
  427.         for part in self.partitions:
  428.             if context is not None:
  429.                 whereClause,whereParams = part.generateWhereClause((None,None,None,context.identifier)) 
  430.                 self.executeSQL(c,countRows%part + " where " + whereClause,whereParams)
  431.             else:
  432.                 self.executeSQL(c,countRows%part)
  433.             rowCount = c.fetchone()[0]
  434.             rows.append(rowCount)
  435.         return reduce(lambda x,y: x+y,rows)
  436.  
  437.     def contexts(self, triple=None):
  438.         c=self._db.cursor()
  439.         if triple:
  440.             subject,predicate,obj = triple
  441.         else:
  442.             subject = predicate = obj = None
  443.         rt=PatternResolution((subject,predicate,obj,None),
  444.                               c,
  445.                               self.partitions,
  446.                               fetchall=False,
  447.                               fetchContexts=True)
  448.         while rt:
  449.             contextId,cTerm = rt
  450.             graphKlass, idKlass = constructGraph(cTerm)
  451.             yield graphKlass(self,idKlass(contextId))
  452.             rt = c.fetchone()
  453.  
  454.     #Namespace persistence interface implementation
  455.     def bind(self, prefix, namespace):
  456.         """ """
  457.         c=self._db.cursor()
  458.         try:
  459.             self.executeSQL(
  460.                 c,
  461.                 "INSERT INTO %s_namespace_binds VALUES ('%s', '%s')"%(
  462.                 self._internedId,
  463.                 prefix,
  464.                 namespace)
  465.             )
  466.         except:
  467.             pass
  468.         c.close()
  469.  
  470.     def prefix(self, namespace):
  471.         """ """
  472.         c=self._db.cursor()
  473.         self.executeSQL(c,"select prefix from %s_namespace_binds where uri = '%s'"%(
  474.             self._internedId,
  475.             namespace)
  476.         )
  477.         rt = [rtTuple[0] for rtTuple in c.fetchall()]
  478.         c.close()
  479.         return rt and rt[0] or None
  480.  
  481.     def namespace(self, prefix):
  482.         """ """
  483.         c=self._db.cursor()
  484.         try:
  485.             self.executeSQL(c,"select uri from %s_namespace_binds where prefix = '%s'"%(
  486.                 self._internedId,
  487.                 prefix)
  488.                       )
  489.         except:
  490.             return None
  491.         rt = [rtTuple[0] for rtTuple in c.fetchall()]
  492.         c.close()
  493.         return rt and rt[0] or None
  494.  
  495.     def namespaces(self):
  496.         """ """
  497.         c=self._db.cursor()
  498.         self.executeSQL(c,"select prefix, uri from %s_namespace_binds where 1;"%(
  499.             self._internedId
  500.             )
  501.         )
  502.         rt=c.fetchall()
  503.         c.close()
  504.         for prefix,uri in rt:
  505.             yield prefix,uri
  506.  
  507.  
  508. CREATE_NS_BINDS_TABLE = """
  509. CREATE TABLE %s_namespace_binds (
  510.     prefix        varchar(20) UNIQUE not NULL,
  511.     uri           text,
  512.     PRIMARY KEY (prefix),
  513.     INDEX uri_index (uri(100))) ENGINE=InnoDB"""        
  514.